1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.channel.tcpsockethandler;
12 
13 import collie.net;
14 import collie.channel.handler;
15 import collie.channel.handlercontext;
16 import kiss.net;
17 import kiss.exception;
18 import kiss.net.TcpStream;
19 import kiss.event.task;
20 
21 final @trusted class TCPSocketHandler : HandlerAdapter!(const(ubyte[]), StreamWriteBuffer)
22 {
23     this(TcpStream sock)
24     {
25 		restSocket(sock);
26     }
27 
28 	@property tcpSocket(){return _socket;}
29 
30 	void restSocket(TcpStream sock)
31 	{
32 		_socket = sock;
33 		_loop = cast(EventLoop) sock.eventLoop();
34 	}
35 
36     override void transportActive(Context ctx)
37     {
38         attachReadCallback();
39         _socket.start();
40         ctx.fireTransportActive();
41     }
42 
43     override void transportInactive(Context ctx)
44     {
45         if (_isAttch && _socket) {
46             _socket.close();
47 		} else {
48         	ctx.fireTransportInactive();
49 		}
50     }
51 
52     override void write(Context ctx, StreamWriteBuffer buffer, TheCallBack cback = null)
53     {
54 		if(_loop.isInLoopThread()){
55 			_postWrite(buffer);
56 		} else {
57 			_loop.postTask(newTask(&_postWrite,buffer));
58 		}
59 
60     }
61 
62     override void close(Context ctx)
63     {
64 		_loop.postTask(newTask(&_postClose));
65     }
66 
67 protected:
68     void attachReadCallback()
69     {
70         _isAttch = true;
71         _socket.onDataReceived(&readCallBack);
72         _socket.onClosed(&closeCallBack);
73         context.pipeline.transport(_socket);
74     }
75 
76     void closeCallBack() nothrow
77     {
78         _isAttch = false;
79         catchAndLogException((){
80             context.fireTransportInactive();
81             context.pipeline.transport(null);
82             _socket.onDataReceived(null);
83             _socket.onClosed(null);
84             _socket = null;
85             context.pipeline.deletePipeline();
86         }());
87     }
88 
89     void readCallBack(in ubyte[] buf) nothrow
90     {
91         catchAndLogException(
92             context.fireRead(buf)
93         );
94     }
95 
96 private:
97 	final void _postClose(){
98 		if (_socket)
99 			_socket.close();
100 	}
101 	
102 	final void _postWrite(StreamWriteBuffer buffer)
103 	{
104 		if(_socket is null){
105 			buffer.doFinish();
106 			return;
107 		}
108 		if (context.pipeline.pipelineManager)
109 			context.pipeline.pipelineManager.refreshTimeout();
110 		_socket.write(buffer);
111 	}
112 private:
113     bool _isAttch = false;
114     TcpStream _socket;
115     EventLoop _loop;
116 }